安装环境

1
2
3
4
5
6
7
8
9
10
# 关闭防火墙
# 配好主机映射
# 配置免密登录
# 准备好安装包 hadoop-2.8.5.tar.gz、flink-1.7.1-bin-hadoop28-scala_2.11.tar.gz
# 创建flink用户,后续操作均在flink用户下操作
# 将Hadoop安装包解压至flink01节点的/data/apps路径下
tar -zxvf ~/hadoop-2.8.5.tar.gz -C /data/apps
# 将flink安装包解压至flink01节点的/data/apps路径下
tar -zxvf ~/flink-1.7.1-bin-hadoop28-scala_2.11.tar.gz -C /data/apps
# 节点配置如下:
IP hostname 配置 节点名称
192.168.23.51 flink01 4核cpu/8G内存/50G硬盘 QuorumPeerMain、JournalNode、NodeManager、NameNode、DFSZKFailoverController、DataNode
192.168.23.52 flink02 4核cpu/8G内存/50G硬盘 QuorumPeerMain、JournalNode、NodeManager、 NameNode、DFSZKFailoverController、DataNode、ResourceManager
192.168.23.53 flink03 4核cpu/8G内存/50G硬盘 QuorumPeerMain、JournalNode、NodeManager、ResourceManager、DataNode

Hadoop HA配置

进入hadoop配置目录

1
2
# 进入hadoop配置目录
cd /data/apps/hadoop-2.8.5/etc/hadoop

修改Java环境配置

1
2
3
4
5
6
# 修改hadoop-env.sh中的JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_181
# 配置yarn-env.sh中的JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_181
# 配置mapred-env.sh中的JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_181

配置slaves

1
vim slaves   内容如下
1
2
3
flink01
flink02
flink03

配置core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<configuration>
<!-- 指定hdfs的nameservice为ns1 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<!-- 修改hadoop临时保存目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/apps/hadoop-2.8.5/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>flink01:2181,flink02:2181,flink03:2181</value>
</property>
<property>
<name>ipc.client.connect.max.retries</name>
<value>100</value>
</property>
<property>
<name>ipc.client.connect.retry.interval</name>
<value>10000</value>
</property>
</configuration>

配置hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<configuration>
<!-- 配置HDFS 的复制因子 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- 关闭HDFS 权限检查,在hdfs-site.xml文件中增加如下配置信息 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/apps/hadoop-2.8.5/tmp/dfs/name1,/data/apps/hadoop-2.8.5/tmp/dfs/name2</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/apps/hadoop-2.8.5/tmp/dfs/data1,/data/apps/hadoop-2.8.5/tmp/dfs/data2</value>
</property>
<!--指定hdfs的nameservice为ns1,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>flink01:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>flink01:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>flink02:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>flink02:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://flink01:8485;flink02:8485;flink03:8485/ns1</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/apps/hadoop-2.8.5/tmp/journal</value>
</property>
<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行
sshfence:当Active出问题后,standby切换成Active,此时,原Active又没有停止服务,这种情况下会被强制杀死进程。
shell(/bin/true):NN Active和它的ZKFC一起挂了,没有人通知ZK,ZK长期没有接到通知,standby要切换,此时,standby调一个shell(脚本内容),这个脚本返回true则切换成功。
-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/flink/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
</configuration>

配置mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<configuration>
<!-- 设置Mapreduce 框架运行名称yarn -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 单个Map task 申请的内存大小 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<!-- 单个Reduce task 申请的内存大小 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>2048</value>
</property>
<!-- Uber模式是Hadoop2中针对小文件作业的一种优化,如果作业量足够小,可以把一个task,在一个JVM中运行完成.-->
<property>
<name>mapreduce.job.ubertask.enable</name>
<value>true</value>
</property>
<!-- 配置历史服务器 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>flink01:19888</value>
</property>
</configuration>

配置yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
<configuration>
<!-- 开启RM高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>rmcluster</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>flink02</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>flink03</value>
</property>
<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>flink01:2181,flink02:2181,flink03:2181</value>
</property>
<!--启用自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<!-- 设置yarn中的服务类 -->
<property>
<description>A comma separated list of services where service name should only
contain a-zA-Z0-9_ and can not start with numbers</description>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- AM重启最大尝试次数 -->
<property>
<description>The maximum number of application attempts. It's a global
setting for all application masters. Each application master can specify
its individual maximum number of application attempts via the API, but the
individual number cannot be more than the global upper bound. If it is,
the resourcemanager will override it. The default number is set to 2, to
allow at least one retry for AM.</description>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>100</value>
</property>
<!-- 开启物理内存限制 -->
<property>
<description>Whether physical memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>true</value>
</property>
<!-- 关闭虚拟内存限制 -->
<property>
<description>Whether virtual memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<!-- 虚拟内存和物理内存比例 -->
<property>
<description>Ratio between virtual memory to physical memory when
setting memory limits for containers. Container allocations are
expressed in terms of physical memory, and virtual memory usage
is allowed to exceed this allocation by this ratio.
</description>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>5</value>
</property>
<!-- 每个Container请求的最小内存 -->
<property>
<description>The minimum allocation for every container request at the RM,
in MBs. Memory requests lower than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>
<!-- 每个Container请求的最大内存 -->
<property>
<description>The maximum allocation for every container request at the RM,
in MBs. Memory requests higher than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>6144</value>
</property>
<!-- 每个Container请求的最小virtual CPU cores -->
<property>
<description>The minimum allocation for every container request at the RM,
in terms of virtual CPU cores. Requests lower than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value>
</property>
<!-- 每个Container请求的最大virtual CPU cores -->
<property>
<description>The maximum allocation for every container request at the RM,
in terms of virtual CPU cores. Requests higher than this will throw a
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>8</value>
</property>
<!-- 限制 NodeManager 能够使用的最大物理内存 -->
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>6144</value>
</property>
<!-- 限制 NodeManager 能够使用的最大virtual CPU cores -->
<property>
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value>
</property>
<!-- 启用日志聚集功能 -->
<property>
<description>Whether to enable log aggregation. Log aggregation collects
each container's logs and moves these logs onto a file-system, for e.g.
HDFS, after the application completes. Users can configure the
"yarn.nodemanager.remote-app-log-dir" and
"yarn.nodemanager.remote-app-log-dir-suffix" properties to determine
where these logs are moved to. Users can access the logs via the
Application Timeline Server.</description>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置HDFS上日志的保存时间,默认设置为7天-->
<property>
<description>Time in seconds to retain user logs. Only applicable if
log aggregation is disabled</description>
<name>yarn.nodemanager.log.retain-seconds</name>
<value>10800</value>
</property>
</configuration>

配置capacity-scheduler.xml

1
2
3
4
5
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.3</value>
<description>集群中可用于运行application master的资源比例上限.</description>
</property>

启动Zookeeper集群

1
2
# 在flink01、flink02、flink03执行以下命令
bin/zkServer.sh start

初始化Hadoop环境

启动journalnode

1
2
# 在flink01、flink02、flink03执行以下命令
sbin/hadoop-daemon.sh start journalnode

格式化namenode

1
2
# 在flink01执行以下命令
bin/hdfs namenode -format

格式化zk

1
2
3
4
5
# 在flink01执行以下命令
bin/hdfs zkfc -formatZK
# 执行完成后,会在zookeeper 上创建一个目录,查看是否创建成功:
# 进入zookeeper家目录,执行bin/zkCli.sh客户端连接ZK。在ZK客户端的shell命令行查看:ls /
# 出现hadoop-ha即表示成功。

启动主namenode

1
2
# 在flink01执行以下命令
sbin/hadoop-daemon.sh start namenode

备用NN 同步主NN信息

1
2
# 在flink02执行以下命令
bin/hdfs namenode -bootstrapStandby

关闭已启动的所有journalnode和主namenode

1
2
# 在flink01执行以下命令
sbin/stop-dfs.sh

启动hadoop集群

启动HDFS

1
2
3
4
5
6
7
8
# 在flink01执行以下命令(建议先启动所有journalnode以防出现namenode连接journalnode超时)
sbin/start-dfs.sh
# 查看两个namenode的状态
bin/hdfs haadmin -getServiceState nn1 #查看nn1状态
bin/hdfs haadmin -getServiceState nn2 #查看nn2状态
# 手动切换namenode状态(此处禁用,有需要再执行)
bin/hdfs haadmin -transitionToActive nn1 ##切换成active
bin/hdfs haadmin -transitionToStandby nn1 ##切换成standby

启动Yarn

1
2
3
4
5
6
7
8
# 在flink02执行以下命令
sbin/start-yarn.sh
# 在flink03执行以下命令
sbin/yarn-daemon.sh start resourcemanager
# 查看两个Resourcemanager的状态
bin/yarn rmadmin -getServiceState rm1 ##查看rm1的状态
bin/yarn rmadmin -getServiceState rm2 ##查看rm2的状态
# 当flink02的ResourceManager是Active状态的时候,访问flink03的ResourceManager会自动跳转到flink02的web页面

进入flink配置目录

1
cd /data/apps/flink-1.7.1/conf

点此查看flink配置说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.address: flink01
# JVM heap size for the JobManager.
jobmanager.heap.size: 1024m
# JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.
taskmanager.heap.size: 2048m
# The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.numberOfTaskSlots: 4
# Default parallelism for jobs.
parallelism.default: 2

# Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
# high-availability mode (required): The high-availability mode has to be set in conf/flink-conf.yaml to zookeeper in order to enable high availability mode. Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance.
high-availability: zookeeper
# File system path (URI) where Flink persists metadata in high-availability setups.
# Storage directory (required): JobManager metadata is persisted in the file system storageDir and only a pointer to this state is stored in ZooKeeper.
# The storageDir stores all metadata needed to recover a JobManager failure.
high-availability.storageDir: hdfs://ns1/flink/recovery
# The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
# ZooKeeper quorum (required): A ZooKeeper quorum is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
high-availability.zookeeper.quorum: flink01:2181,flink02:2181,flink03:2181
# The root path under which Flink stores its entries in ZooKeeper.
# ZooKeeper root (recommended): The root ZooKeeper node, under which all cluster nodes are placed.
high-availability.zookeeper.path.root: /flink
# yarn.application-attempts: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.
yarn.application-attempts: 100

# The state backend to be used to store and checkpoint state.
state.backend: rocksdb
# The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.checkpoints.dir: hdfs://ns1/flink/flink-checkpoints
# The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).
state.savepoints.dir: hdfs://ns1/flink/save-checkpoints
# Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option.
state.backend.incremental: true

# Directories for temporary files, separated by",", "|", or the system's java.io.File.pathSeparator.
io.tmp.dirs: /data/apps/flinkapp/tmp

切记:Flink On Yarn HA一定不要手动配置high-availability.cluster-id

1
2
3
4
5
# The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.
# ZooKeeper cluster-id (recommended): The cluster-id ZooKeeper node, under which all required coordination data for a cluster is placed.
# The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.
# Important: You should not set this value manually when running a YARN cluster, a per-job YARN session, or on another cluster manager. In those cases a cluster-id is automatically being generated based on the application id. Manually setting a cluster-id overrides this behaviour in YARN. Specifying a cluster-id with the -z CLI option, in turn, overrides manual configuration. If you are running multiple Flink HA clusters on bare metal, you have to manually configure separate cluster-ids for each cluster.
high-availability.cluster-id: /default

替换日志框架为logback

1
2
3
4
5
6
7
# 移除flink的lib目录下log4j及slf4j-log4j12的jar(如log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar);
# 移除flink的conf目录下log4j相关的配置文件(如log4j-cli.properties、log4j-console.properties、log4j.properties、log4j-yarn-session.properties)
# 添加logback-classic.jar、logback-core.jar、log4j-over-slf4j.jar到flink的lib目录下
# 自定义logback的配置,覆盖flink的conf目录下的logback.xml、logback-console.xml、logback-yarn.xml
# 使用flink-daemon.sh启动的flink使用的logback配置文件是logback.xml;
# 使用flink-console.sh启动的flink使用的logback配置文件是logback-console.xml;
# 使用yarn-session.sh启动的flink使用的logback配置文件是logback-yarn.xml

logback-yarn.xml配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!--定义日志文件的存储目录,勿使用相对路径-->
<property name="LOG_HOME" value="/data/apps/flinkapp/logs"/>

<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度 %msg:日志消息,%n是换行符-->
<property name="pattern" value="%d{yyyyMMdd:HH:mm:ss.SSS} [%thread] %-5level %msg%n"/>

<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>-->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!-- INFO_FILE -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/info/info.log</file>
<!--只输出INFO-->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/info/info_%d{yyyy-MM-dd}.log.%i.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--<maxHistory>30</maxHistory>-->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<!-- ERROR_FILE -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/error/error.log</file>
<!--只输出ERROR-->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/error/error_%d{yyyy-MM-dd}.log.%i.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--<maxHistory>30</maxHistory>-->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${pattern}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>

<logger name="com.haier.flink" level="DEBUG" additivity="true">
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</logger>

<logger name="java.sql.Connection" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="java.sql.Statement" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="java.sql.PreparedStatement" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<!--根logger-->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

</configuration>
1
2
3
4
5
# 开始一个yarn-session(命名为FlinkTestCluster)
# JobManager内存2048M
# 每个TaskManager内存2048M且分配4个slot(The session cluster will automatically allocate additional containers which run the Task Managers when jobs are submitted to the cluster.)
# 分离式模式启动
yarn-session.sh -jm 2048 -tm 2048 -s 4 -nm FlinkTestCluster -d
配置 测试方案 现象 备注
Job本身配置了Flink的重启策略 提供bug程序,导致Job失败 重启失败的Job 保证Job HA
Yarn的yarn-site.xml配置了yarn.resourcemanager.am.max-attempts、Flink的flink-conf.yaml配置了yarn.application-attempts kill掉YarnSessionClusterEntrypoint进程(JobManager和AM的共同进程) 重启JobManager和AM,该进程会迁移到其它节点(非必须)且进程号改变,全部Job重启 保证JobManager HA
Job本身配置了Flink的重启策略、Yarn的yarn-site.xml配置了yarn.resourcemanager.am.max-attempts、Flink的flink-conf.yaml配置了yarn.application-attempts kill掉YarnTaskExecutorRunner进程(TaskManager进程) 重启TaskManager,该进程会迁移到其它节点(非必须)且进程号改变,被Kill掉的TaskManager包含的Job重启 保证TaskManager HA
Yarn的yarn-site.xml配置了yarn.resourcemanager.am.max-attempts、Flink的flink-conf.yaml配置了high-availability.zookeeper.quorum、high-availability.storageDir、high-availability.zookeeper.path.root、yarn.application-attempts 未主动Cancel掉Flink集群中的Job,但不小心kill掉对应的yarn-session(对应Yarn队列中的一个Application)、之后在命令行重新提交yarn-session 启动新的yarn-session、之前未Cancel掉的Job自动迁移到当前yarn-session、JobManager和TaskManager自动创建 保证 YarnSessionHA
Yarn的yarn-site.xml配置了yarn.resourcemanager.am.max-attempts、Flink的flink-conf.yaml配置了high-availability.zookeeper.quorum、high-availability.storageDir、high-availability.zookeeper.path.root、yarn.application-attempts、配置了Yarn的HA Kill掉Resourcemanager ResourceManager迁移到另一台节点,yarn-session重启,所有Job重启 保证Yarn HA
Yarn的yarn-site.xml配置了yarn.resourcemanager.am.max-attempts、Flink的flink-conf.yaml配置了high-availability.zookeeper.quorum、high-availability.storageDir、high-availability.zookeeper.path.root、yarn.application-attempts(也可在yarn-session提交时通过-D动态配置)、配置了HDFS的HA Kill掉NameNode NameNode迁移到另一台节点 保证HDFS HA

Yarn的基本思想

YARN的基本思想是将资源管理和作业调度/监视的功能分解为单独的守护进程。我们的想法是拥有一个全局ResourceManager(RM)和每个应用程序ApplicationMaster(AM)。应用程序可以是单个作业,也可以是作业的DAG。

ResourceManager和NodeManager构成了数据计算框架。ResourceManager是在系统中的所有应用程序之间仲裁资源的最终权限。NodeManager是每台机器上负责Containers的代理框架,监视其资源使用情况(CPU,内存,磁盘,网络)并将其报告给ResourceManager / Scheduler。

每个应用程序ApplicationMaster实际上是一个含具体库的框架,其任务是协调来自ResourceManager的资源,并与NodeManager一起执行和监视任务。

MapReduce NextGen架构

ResourceManager有两个主要组件:Scheduler和ApplicationsManager。

Scheduler负责根据熟悉的容量,队列等约束将资源分配给各种正在运行的应用程序。Scheduler是纯调度程序,因为它不执行应用程序状态的监视或跟踪。此外,当出现应用程序故障或硬件故障,它无法保证重新启动失败的任务。Scheduler根据应用程序的资源需求执行其调度功能; 它是基于资源Container的抽象概念,它包含内存,CPU,磁盘,网络等元素。

Scheduler具有可插入策略,该策略负责在各种队列,应用程序等之间对集群资源进行分区。当前的调度程序(如CapacitySchedulerFairScheduler)将是插件的一些示例。

ApplicationsManager负责接受作业提交,协商第一个容器以执行特定于应用程序的ApplicationMaster,并提供在失败时重新启动ApplicationMaster容器的服务。每个应用程序ApplicationMaster负责从Scheduler协调适当的资源容器,跟踪其状态并监视进度。

img

YARN客户端需要访问Hadoop配置以连接到YARN资源管理器和HDFS。它使用以下策略确定Hadoop配置:

  • 按顺序测试是否配置YARN_CONF_DIRHADOOP_CONF_DIRHADOOP_CONF_PATH。如果设置了其中一个变量,则用于读取配置。
  • 如果上述策略失败(在正确的YARN设置中不应该这样),则客户端使用配置的HADOOP_HOME环境变量。如果HADOOP_HOME环境变量已配置,则客户端尝试访问$HADOOP_HOME/etc/hadoop(Hadoop 2)或$HADOOP_HOME/conf(Hadoop 1)。

启动新的Flink YARN会话时,客户端首先检查所请求的资源(ApplicationMaster的memory和vcores)是否可用。之后,它将包含Flink的jar包和配置信息上传到HDFS(步骤1)。

客户端的下一步是请求(步骤2)YARN容器以启动ApplicationMaster(步骤3)。客户端将配置信息和jar文件注册为容器的资源,在特定机器上运行的NodeManager将负责准备容器(例如下载文件的工作)。完成后,将启动ApplicationMaster(AM)。

JobManager和AM在同一容器中运行。一旦它们成功启动,AM就知道JobManager(Flink主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager),该文件也上传到HDFS。此外,AM容器还提供Flink的Web界面。YARN代码分配的所有端口都是临时端口。这允许用户并行执行多个Flink YARN会话。

之后,AM开始为Flink的TaskManagers分配容器(步骤4),这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业。

参考文档

HDFS High Availability Using the Quorum Journal Manager

ResourceManager High Availability

Apache Hadoop YARN

JobManager High Availability (HA)

Flink on Yarn